参考文章
主要以一些官方文档为参考。
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics
https://help.aliyun.com/document_detail/34994.html?spm=a2c4g.11174283.6.650.6f02590e0d209m#h2-url-1
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html
http://xinhstechblog.blogspot.com/2016/04/spark-window-functions-for-dataframes.html
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-5/
https://www.cnblogs.com/piaolingzxh/p/5538783.html
准备数据
1 | object WindowFunctionTest extends BaseSparkSession { |
平均移动值
DataFrame API方式实现
方式一:1
2
3// 窗口定义从 -1(前一行)到 1(后一行),每一个滑动的窗口总用有3行
val movinAvgSpec = Window.partitionBy("site").orderBy("date").rowsBetween(-1, 1)
df.withColumn("MovingAvg", avg(df("user_cnt")).over(movinAvgSpec)).show()
方式二:1
2
3
4
5
6
7 val movinAvgSpec = Window.partitionBy("site").orderBy("date").rowsBetween(-1, 1)
df.select(
$"site",
$"date",
$"amount",
avg($"user_cnt").over(movinAvgSpec).as("moving_avg_user_cnt")
).show()
sql方式实现
1 | df.createOrReplaceTempView("site_info") |
lag函数
说明:取当前记录的前x条数据的指定列,如果没有返回null,有就返回真实值。
DataFrame API方式实现
方式一:1
2val lagwSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("prevUserCnt", lag(df("user_cnt"), 1).over(lagwSpec)).show()
方式二:1
2
3
4
5
6
7 val lagwSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"amount",
lag($"user_cnt").over(movinAvgSpec).as("lag_user_cnt")
).show()
sql方式实现
1 | df.createOrReplaceTempView("site_info") |
lead函数
说明:取当前记录的后x条数据的指定列,如果没有返回null,有就返回真实值。
DataFrame API方式实现
方式一:1
2val leadwSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("lead_user_cnt", lead(df("user_cnt"), 1).over(leadwSpec)).show()
方式二:1
2
3
4
5
6
7val leadwSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"user_cnt",
lead($"user_cnt", 1).over(leadwSpec).as("lead_user_cnt")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+-------------+ |
FIRST_VALUE函数
说明:该函数用于获取分组排序后最第一条记录的字段值。
DataFrame API方式实现
方式一:1
2val firstValuewSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("first_value_user_cnt", first("user_cnt").over(firstValuewSpec)).show()
方式二:1
2
3
4
5
6val firstValuewSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"user_cnt",
first($"user_cnt").over(firstValuewSpec).as("first_value_user_cnt")).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+--------------------+ |
LAST_VALUE函数
说明:该函数用于获取分组排序后最后一条记录的字段值。
DataFrame API方式实现
方式一:1
2val lastValuewSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("last_value_user_cnt", last("user_cnt").over(lastValuewSpec)).show()
方式二:1
2
3
4
5
6val lastValuewSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
last($"user_cnt").over(lastValuewSpec).as("last_value_user_cnt")).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+-------------------+ |
COUNT
说明:该函数用于计算计数值。
不指定order by
###
方式一:1
2val counWSpec = Window.partitionBy("site")
df.withColumn("count", count("user_cnt").over(counWSpec)).show()
方式二:1
2
3
4
5
6val counWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
count($"user_cnt").over(counWSpec).as("count")).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+-----+ |
指定order by
指定order by时,返回当前窗口内从开始行到当前行的累计计数值。
DataFrame API方式实现
方式一:1
2val counWSpec = Window.partitionBy("site").orderBy('date.asc)
df.withColumn("count", count("user_cnt").over(counWSpec)).show()
方式二:1
2
3
4
5
6val counWSpec = Window.partitionBy("site").orderBy("date")
df.select(
$"site",
$"date",
$"user_cnt",
count($"user_cnt").over(counWSpec).as("count"))
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+-----+ |
sum函数
说明:该函数用于计算汇总值。
不指定order by
DataFrame API方式实现
方式一:1
2val sumWSpec = Window.partitionBy("site")
df.withColumn("sum_user_cnt", sum("user_cnt").over(sumWSpec)).show()
方式二:1
2
3
4
5
6
7val sumWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
sum($"user_cnt").over(sumWSpec).as("sum_user_cnt")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+------------+ |
指定order by
DataFrame API方式实现
方式一:1
2val sumWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("sum_user_cnt", sum("user_cnt").over(sumWSpec))
方式二:1
2
3
4
5
6
7val sumWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
sum($"user_cnt").over(sumWSpec).as("sum_user_cnt")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+------------+ |
min函数
不指定order by
DataFrame API方式实现
方式一:1
2val minWSpec = Window.partitionBy("site")
df.withColumn("min_user_cnt", min("user_cnt").over(minWSpec)).show()
方式二:1
2
3
4
5
6df.select(
$"site",
$"date",
$"user_cnt",
min($"user_cnt").over(minWSpec)
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+----------------------------------------------------------+ |
指定order by
方式一:1
2val minWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("min_user_cnt", min("user_cnt").over(minWSpec)).show()
方式二:1
2
3
4
5
6
7val minWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
min($"user_cnt").over(minWSpec)
)
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+----------------------------------------------------------+ |
max函数
不指定order by
DataFrame API方式实现
方式一:1
2val maxWSpec = Window.partitionBy("site")
df.withColumn("min_user_cnt", max("user_cnt").over(maxWSpec))
方式二:1
2
3
4
5
6
7val maxWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
max($"user_cnt").over(maxWSpec).as("max_user_cnt")
)
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+------------+ |
指定order by
DataFrame API方式实现
方式一:1
2val maxWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("min_user_cnt", max("user_cnt").over(maxWSpec)).show()
方式二:1
2
3
4
5
6
7val maxWSpec = Window.partitionBy("site").orderBy('date asc).rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
max($"user_cnt").over(maxWSpec).as("max_user_cnt")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+------------+ |
avg函数
说明:该函数用于计算平均值。
不指定order by
DataFrame API方式实现
方式一:1
2val avgWSpec = Window.partitionBy("site")
df.withColumn("avg_user_cnt", avg("user_cnt").over(avgWSpec)).show()
方式二:1
2
3
4
5
6
7val avgWSpec = Window.partitionBy("site")
df.select(
$"site",
$"date",
$"user_cnt",
avg("user_cnt").over(avgWSpec).as("avg_user_cnt")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+------------+ |
指定order by
DataFrame API方式实现
方式一:1
2val avgWSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.withColumn("avg_user_cnt", avg("user_cnt").over(avgWSpec)).show()
方式二:1
2
3
4
5
6
7val avgWSpec = Window.partitionBy("site").orderBy("date").rowsBetween(Long.MinValue, Long.MaxValue)
df.select(
$"site",
$"date",
$"user_cnt",
avg("user_cnt").over(avgWSpec).as("avg_user_cnt")
)
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+------------+ |
rank函数
说明:该函数用于计算排名。
DataFrame API方式实现
方式一:1
2val rankWSpec = Window.partitionBy("site").orderBy('user_cnt.desc)
df.withColumn("rank", rank().over(rankWSpec))
方式二:1
2
3
4
5
6
7val rankWSpec = Window.partitionBy("site").orderBy('user_cnt.desc)
df.select(
$"site",
$"date",
$"user_cnt",
rank().over(rankWSpec).as("rank")
).show()
sql方式实现
1 | spark.sql( |
结果
1 |
row_number over 函数
DataFrame API方式实现
方式一:1
2val rowNUmberWSpec = Window.partitionBy("site").orderBy('date desc, 'user_cnt desc)
df.withColumn("row_num", row_number().over(rowNUmberWSpec)).show()
方式二:1
2
3
4
5
6
7val rowNUmberWSpec = Window.partitionBy("site").orderBy('date desc, 'user_cnt desc)
df.select(
$"site",
$"date",
$"user_cnt",
row_number().over(rowNUmberWSpec).as("row_num")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+-------+ |
dense_rank函数
说明:该函数用于计算连续排名。
DataFrame API方式实现
方式一:1
2val denseRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.withColumn("dense_rank", dense_rank() over (denseRankWSpec)).show()
方式二:1
2
3
4
5
6
7val denseRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.select(
$"site",
$"date",
$"user_cnt",
dense_rank().over(denseRankWSpec).as("dense_rank")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+----------+ |
percent_rank函数
说明:该函数用于计算一组数据中某行的相对排名。
DataFrame API方式实现
方式一:1
2val percentRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.withColumn("percent_rank", percent_rank() over (percentRankWSpec)).show()
方式二:1
2
3
4
5
6
7val percentRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.select(
$"site",
$"date",
$"user_cnt",
percent_rank().over(percentRankWSpec).as("percent_rank")
).show()
sql方式实现
1 | spark.sql( |
结果
1 | |site| date|user_cnt|percent_rank| |
ntile函数
说明:用于将分组数据按照顺序切分成n片,并返回当前切片值,如果切片不均匀,默认增加第一个切片的分布。
DataFrame API方式实现
方式一:1
2val ntileRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.withColumn("ntile", ntile(2).over(ntileRankWSpec)).show()
方式二:1
2
3
4
5
6
7val ntileRankWSpec = Window.partitionBy("site").orderBy('date asc)
df.select(
$"site",
$"date",
$"user_cnt",
ntile(2).over(ntileRankWSpec).as("ntile")
)
sql方式实现
1 | spark.sql( |
结果
1 | +----+----------+--------+-----+ |
致谢!
本人能力有限,博客错误难免,有错往将错误发送到邮箱(t_spider@aliyun.com)